package com.dlink.executor;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.table.typeutils.FieldInfoUtils;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
 * 定制TableEnvironmentImpl
 *
 * @author wenmo
 * @since 2021/6/7 22:06
 **/
public class CustomTableEnvironmentImpl extends TableEnvironmentImpl implements CustomTableEnvironment {

    private final StreamExecutionEnvironment executionEnvironment;

    public CustomTableEnvironmentImpl(
        CatalogManager catalogManager,
        ModuleManager moduleManager,
        FunctionCatalog functionCatalog,
        TableConfig tableConfig,
        StreamExecutionEnvironment executionEnvironment,
        Planner planner,
        Executor executor,
        boolean isStreamingMode,
        ClassLoader userClassLoader) {
        super(
            catalogManager,
            moduleManager,
            tableConfig,
            executor,
            functionCatalog,
            planner,
            isStreamingMode,
            userClassLoader);
        this.executionEnvironment = executionEnvironment;
    }


    public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
        return create(executionEnvironment, EnvironmentSettings.newInstance().build());
    }

    public static CustomTableEnvironmentImpl createBatch(StreamExecutionEnvironment executionEnvironment) {
        Configuration configuration = new Configuration();
        configuration.setString("execution.runtime-mode", "BATCH");
        TableConfig tableConfig = new TableConfig();
        tableConfig.addConfiguration(configuration);
        return create(executionEnvironment, EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(), tableConfig);
    }

    public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
        return create(executionEnvironment, settings, new TableConfig());
    }

    public static CustomTableEnvironmentImpl create(
        StreamExecutionEnvironment executionEnvironment,
        EnvironmentSettings settings,
        TableConfig tableConfig) {

        if (!settings.isStreamingMode()) {
            throw new TableException(
                "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
        }

        // temporary solution until FLINK-15635 is fixed
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

        ModuleManager moduleManager = new ModuleManager();

        CatalogManager catalogManager =
            CatalogManager.newBuilder()
                .classLoader(classLoader)
                .config(tableConfig.getConfiguration())
                .defaultCatalog(
                    settings.getBuiltInCatalogName(),
                    new GenericInMemoryCatalog(
                        settings.getBuiltInCatalogName(),
                        settings.getBuiltInDatabaseName()))
                .executionConfig(executionEnvironment.getConfig())
                .build();

        FunctionCatalog functionCatalog =
            new FunctionCatalog(tableConfig, catalogManager, moduleManager);

        Map<String, String> executorProperties = settings.toExecutorProperties();
        Executor executor = lookupExecutor(executorProperties, executionEnvironment);

        Map<String, String> plannerProperties = settings.toPlannerProperties();
        Planner planner =
            ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
                .create(
                    plannerProperties,
                    executor,
                    tableConfig,
                    functionCatalog,
                    catalogManager);

        return new CustomTableEnvironmentImpl(
            catalogManager,
            moduleManager,
            functionCatalog,
            tableConfig,
            executionEnvironment,
            planner,
            executor,
            settings.isStreamingMode(),
            classLoader);
    }


    private static Executor lookupExecutor(
        Map<String, String> executorProperties,
        StreamExecutionEnvironment executionEnvironment) {
        try {
            ExecutorFactory executorFactory =
                ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
            Method createMethod =
                executorFactory
                    .getClass()
                    .getMethod("create", Map.class, StreamExecutionEnvironment.class);

            return (Executor)
                createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
        } catch (Exception e) {
            throw new TableException(
                "Could not instantiate the executor. Make sure a planner module is on the classpath",
                e);
        }
    }

    public ObjectNode getStreamGraph(String statement) {
        List<Operation> operations = super.parser.parse(statement);
        if (operations.size() != 1) {
            throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
        } else {
            List<ModifyOperation> modifyOperations = new ArrayList<>();
            for (int i = 0; i < operations.size(); i++) {
                if (operations.get(i) instanceof ModifyOperation) {
                    modifyOperations.add((ModifyOperation) operations.get(i));
                }
            }
            List<Transformation<?>> trans = super.planner.translate(modifyOperations);
            if (execEnv instanceof ExecutorBase) {
                StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
                JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
                String json = jsonGenerator.getJSON();
                ObjectMapper mapper = new ObjectMapper();
                ObjectNode objectNode = mapper.createObjectNode();
                try {
                    objectNode = (ObjectNode) mapper.readTree(json);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                } finally {
                    return objectNode;
                }
            } else {
                throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
            }
        }
    }

    public JobPlanInfo getJobPlanInfo(List<String> statements) {
        return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
    }

    public StreamGraph getStreamGraphFromInserts(List<String> statements) {
        List<ModifyOperation> modifyOperations = new ArrayList();
        for (String statement : statements) {
            List<Operation> operations = getParser().parse(statement);
            if (operations.size() != 1) {
                throw new TableException("Only single statement is supported.");
            } else {
                Operation operation = operations.get(0);
                if (operation instanceof ModifyOperation) {
                    modifyOperations.add((ModifyOperation) operation);
                } else {
                    throw new TableException("Only insert statement is supported now.");
                }
            }
        }
        List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
        if (execEnv instanceof ExecutorBase) {
            StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
            if (tableConfig.getConfiguration().containsKey(PipelineOptions.NAME.key())) {
                streamGraph.setJobName(tableConfig.getConfiguration().getString(PipelineOptions.NAME));
            }
            return streamGraph;
        } else {
            throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
        }
    }

    public JobGraph getJobGraphFromInserts(List<String> statements) {
        return getStreamGraphFromInserts(statements).getJobGraph();
    }

    public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
        SqlExplainResult record = new SqlExplainResult();
        List<Operation> operations = parser.parse(statement);
        record.setParseTrue(true);
        if (operations.size() != 1) {
            throw new TableException(
                "Unsupported SQL query! explainSql() only accepts a single SQL query.");
        }
        Operation operation = operations.get(0);
        if (operation instanceof ModifyOperation) {
            record.setType("Modify DML");
        } else if (operation instanceof ExplainOperation) {
            record.setType("Explain DML");
        } else if (operation instanceof QueryOperation) {
            record.setType("Query DML");
        } else {
            record.setExplain(operation.asSummaryString());
            record.setType("DDL");
        }
        record.setExplainTrue(true);
        if ("DDL".equals(record.getType())) {
            return record;
        }
        record.setExplain(planner.explain(operations, extraDetails));
        return record;
    }

    public boolean parseAndLoadConfiguration(String statement, StreamExecutionEnvironment environment, Map<String, Object> setMap) {
        return false;
    }

    public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) {
        JavaDataStreamQueryOperation<T> queryOperation =
            asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));

        return createTable(queryOperation);
    }

    public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
        List<Expression> expressions = ExpressionParser.parseExpressionList(fields);
        return fromDataStream(dataStream, expressions.toArray(new Expression[0]));
    }

    @Override
    public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
        createTemporaryView(path, fromDataStream(dataStream, fields));
    }

    @Override
    public <T> void createTemporaryView(
        String path, DataStream<T> dataStream, Expression... fields) {
        createTemporaryView(path, fromDataStream(dataStream, fields));
    }

    private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
        DataStream<T> dataStream, Optional<List<Expression>> fields) {
        TypeInformation<T> streamType = dataStream.getType();

        // get field names and types for all non-replaced fields
        FieldInfoUtils.TypeInfoSchema typeInfoSchema =
            fields.map(
                    f -> {
                        FieldInfoUtils.TypeInfoSchema fieldsInfo =
                            FieldInfoUtils.getFieldsInfo(
                                streamType, f.toArray(new Expression[0]));

                        // check if event-time is enabled
                        validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
                        return fieldsInfo;
                    })
                .orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType));

        return new JavaDataStreamQueryOperation<>(
            dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toTableSchema());
    }

    private void validateTimeCharacteristic(boolean isRowtimeDefined) {
        if (isRowtimeDefined
            && executionEnvironment.getStreamTimeCharacteristic()
            != TimeCharacteristic.EventTime) {
            throw new ValidationException(
                String.format(
                    "A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s",
                    executionEnvironment.getStreamTimeCharacteristic()));
        }
    }
}
